functionprocessing_waterfall(task,callback){
// run through the waterfall of tasksasync.waterfall([function(cb){
returncb(null,task)
}
//,in_process
,function(t,cb){
async.applyEach([get_hpms_aadt
,get_detector_routes]
,t
,function(err,t1,t2){
returncb(null,t)
});
returnnull
}
,post_process_sql_queries
,get_detector_fractions
,get_hpms_fractions
,reduce.post_process_couch_query
,reduce.apply_fractions]
,callback
)
returnnull
}
async_waterfall
Each element is an a async function
Each element expect to get the results of the previous function
In practice, each task item (one grid’s state) gets large
reduce
functionreduce(memo,item,callback){
// stash item's street data into memo_.each(item.accum,function(roads,ts){
if(memo[ts]===undefined){
memo[ts]=_.clone(roads,true)
}else{
_.each(roads,function(record,road_class){
if(memo[ts][road_class]===undefined){
memo[ts][road_class]=_.clone(record,true)
}else{
_.each(record,function(v,k){
memo[ts][road_class][k] += v
})
}
})
}
});
// stash item's freeway data into memo_.each(item.detector_data,function(record,ts){
// could also insert speed here into to the sum by// multiplying by n to weight it, as I do elsewherevar detector_miles = record[unmapper.miles]
if(memo[ts]===undefined){
memo[ts]={}
}
if(memo[ts]['detector_based']===undefined){
memo[ts]['detector_based']={'n':record[unmapper.n]
,'n_mt':record[unmapper.n]*detector_miles
,'hh_mt':record[unmapper.hh]*detector_miles
,'nhh_mt':record[unmapper.not_hh]*detector_miles
,'lane_miles':record[unmapper.lane_miles]
}
}else{
memo[ts]['detector_based'].n += record[unmapper.n]
memo[ts]['detector_based'].n_mt += record[unmapper.n]*detector_miles
memo[ts]['detector_based'].hh_mt += record[unmapper.hh]*detector_miles
memo[ts]['detector_based'].nhh_mt += record[unmapper.not_hh]*detector_miles
memo[ts]['detector_based'].lane_miles += record[unmapper.lane_miles]
}
});
returncallback(null,memo)
}
reduce
functionreduce(memo,item,callback){
// stash item's street data into memo
...code...
// stash item's freeway data into memo
...code...
returncallback(null,memo)
}
It worked when I tested
a city worked fine
and small counties too,
but it didn’t scale to a full county
The problem
The async.reduce step was passed a list of “tasks”
Each task was an object, and accumulated its own state from the various database calls
At the end of querying a county, I’d have hundreds of grid “tasks” with hour by hour data on traffic flow still being referenced
Solution, garbage collecting
I hate managing memory
It’s why I stopped programming in 68000 assembly language